package com.joysuccess.consumer.snmp.service;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * Bacnet协议消费主题
 *
 * @author zhangqing
 * @date 2019年05月04日
 */
@Slf4j
@Component
public class BacnetConsumer {
    private final static Logger LOGGER = LoggerFactory.getLogger(BacnetConsumer.class);

    private final static String BACNET_KAFKA_TOPIC="lengyuan-bacnet-datas";

    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 消费bacnet产生的数据，发送到Redis和ES
     * @param record
     */
    @KafkaListener(topics = BACNET_KAFKA_TOPIC,groupId = "BacnetConsumerGroup")
    public void onMessageRedisString(ConsumerRecord<?, String> record) {
        String value = record.value();
        JSONObject jsonObject = JSONObject.parseObject(value);

        String objectName = (String)jsonObject.get("objectName");
        String presentValue = (String)jsonObject.get("presentValue");
        String assetId = (String)jsonObject.get("assetId");

        redisTemplate.opsForHash().put(assetId,objectName,presentValue);
    }
}
